73692f39fffbf9b3cd212ab63497ba8d95019ee8,components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java,MinaComponent,createDatagramEndpoint,#String#MinaConfiguration#,254
Before Change
boolean sync = configuration.isSync();
List<IoFilter> filters = configuration.getFilters();
IoAcceptor acceptor = new DatagramAcceptor(getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "MinaDatagramAcceptor"));
IoConnector connector = new DatagramConnector(getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "MinaDatagramConnector"));
SocketAddress address = new InetSocketAddress(configuration.getHost(), configuration.getPort());
After Change
boolean sync = configuration.isSync();
List<IoFilter> filters = configuration.getFilters();
ExecutorService acceptorPool = getCamelContext().getExecutorServiceManager().newCachedThreadPool(this, "MinaDatagramAcceptor");
ExecutorService connectorPool = getCamelContext().getExecutorServiceManager().newCachedThreadPool(this, "MinaDatagramConnector");
ExecutorService workerPool = getCamelContext().getExecutorServiceManager().newCachedThreadPool(this, "MinaThreadPool");
IoAcceptor acceptor = new DatagramAcceptor(acceptorPool);
IoConnector connector = new DatagramConnector(connectorPool);
SocketAddress address = new InetSocketAddress(configuration.getHost(), configuration.getPort());
if (transferExchange) {
throw new IllegalArgumentException("transferExchange=true is not supported for datagram protocol");
}
DatagramConnectorConfig connectorConfig = new DatagramConnectorConfig();
// must use manual thread model according to Mina documentation
connectorConfig.setThreadModel(ThreadModel.MANUAL);
configureDataGramCodecFactory("MinaProducer", connectorConfig, configuration);
connectorConfig.getFilterChain().addLast("threadPool", new ExecutorFilter(workerPool));
if (minaLogger) {
connectorConfig.getFilterChain().addLast("logger", new LoggingFilter());
}
appendIoFiltersToChain(filters, connectorConfig.getFilterChain());
// set connect timeout to mina in seconds
connectorConfig.setConnectTimeout((int) (timeout / 1000));
DatagramAcceptorConfig acceptorConfig = new DatagramAcceptorConfig();
// must use manual thread model according to Mina documentation
acceptorConfig.setThreadModel(ThreadModel.MANUAL);
configureDataGramCodecFactory("MinaConsumer", acceptorConfig, configuration);
acceptorConfig.setDisconnectOnUnbind(true);
// reuse address is default true for datagram
acceptorConfig.getFilterChain().addLast("threadPool", new ExecutorFilter(workerPool));
if (minaLogger) {
acceptorConfig.getFilterChain().addLast("logger", new LoggingFilter());
}
appendIoFiltersToChain(filters, acceptorConfig.getFilterChain());
MinaEndpoint endpoint = new MinaEndpoint(uri, this);
endpoint.setAddress(address);
endpoint.setAcceptor(acceptor);
endpoint.setAcceptorConfig(acceptorConfig);
endpoint.setConnector(connector);
endpoint.setConnectorConfig(connectorConfig);
endpoint.setConfiguration(configuration);
// enlist threads pools in use on endpoint
endpoint.addThreadPool(acceptorPool);
endpoint.addThreadPool(connectorPool);
endpoint.addThreadPool(workerPool);